feat: implement self-healing recovery mechanism for gaps in L1 data#403
feat: implement self-healing recovery mechanism for gaps in L1 data#403jonastheis wants to merge 45 commits intomainfrom
Conversation
CodSpeed Performance ReportMerging #403 will not alter performanceComparing Summary
|
… blocks if the send channel is full
…in ChainOrchestrator
frisitano
left a comment
There was a problem hiding this comment.
Added some comments inline.
| // testing | ||
| #[cfg(feature = "test-utils")] | ||
| { | ||
| let (tx, rx) = tokio::sync::mpsc::channel(1000); |
There was a problem hiding this comment.
Can we create a L1 watcher handle and receiver channel here that can be used for testing?
crates/database/db/src/operations.rs
Outdated
| async fn get_batch_by_index( | ||
| &self, | ||
| batch_index: u64, | ||
| processed: Option<bool>, |
There was a problem hiding this comment.
What's the purpose of adding the processed filter?
There was a problem hiding this comment.
Thought I needed it at some point. reverted.
crates/watcher/src/handle/command.rs
Outdated
| /// New sender to replace the current notification channel | ||
| new_sender: mpsc::Sender<Arc<L1Notification>>, | ||
| /// Oneshot sender to signal completion of the reset operation | ||
| response_sender: oneshot::Sender<()>, |
There was a problem hiding this comment.
not really needed. removed.
crates/watcher/src/handle/mod.rs
Outdated
| /// This trait allows the chain orchestrator to send commands to the L1 watcher, | ||
| /// primarily for gap recovery scenarios. | ||
| #[async_trait::async_trait] | ||
| pub trait L1WatcherHandleTrait: Send + Sync + 'static { |
There was a problem hiding this comment.
What value does a trait add here as opposed to using a concrete type? Do we intend to have some sort of genericness on the handle?
There was a problem hiding this comment.
yeah thought I'd need it for testing. removed now.
crates/watcher/src/handle/mod.rs
Outdated
| pub struct MockL1WatcherHandle { | ||
| /// Track reset calls as (`block_number`, `channel_capacity`) | ||
| resets: Arc<std::sync::Mutex<Vec<(u64, usize)>>>, | ||
| } |
There was a problem hiding this comment.
Why do we need this? Can't we just inspect the receiver channel directly? I think we would then be able to remove MockL1WatcherHandle and the L1WatcherHandleTrait and just use the L1WatcherHandle directly. I think this would result in simpler code.
There was a problem hiding this comment.
yeah thought I'd need it for testing. removed now.
crates/chain-orchestrator/src/lib.rs
Outdated
| /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`]. | ||
| l1_notification_rx: Receiver<Arc<L1Notification>>, | ||
| /// Handle to send commands to the L1 watcher (e.g., for gap recovery). | ||
| l1_watcher_handle: Option<H>, |
There was a problem hiding this comment.
removed the option
crates/chain-orchestrator/src/lib.rs
Outdated
| ) { | ||
| Err(ChainOrchestratorError::L1MessageQueueGap(queue_index)) => { | ||
| // Query database for the L1 block of the last known L1 message | ||
| let reset_block = | ||
| self.database.get_last_l1_message_l1_block().await?.unwrap_or(0); | ||
| // TODO: handle None case (no messages in DB) | ||
|
|
||
| tracing::warn!( | ||
| target: "scroll::chain_orchestrator", | ||
| "L1 message queue gap detected at index {}, last known message at L1 block {}", | ||
| queue_index, | ||
| reset_block | ||
| ); | ||
|
|
||
| // Trigger gap recovery | ||
| self.trigger_gap_recovery(reset_block, "L1 message queue gap").await?; | ||
|
|
||
| // Return no event, recovery will re-process | ||
| Ok(None) | ||
| } | ||
| Err(ChainOrchestratorError::DuplicateL1Message(queue_index)) => { | ||
| tracing::info!( | ||
| target: "scroll::chain_orchestrator", | ||
| "Duplicate L1 message detected at {:?}, skipping", | ||
| queue_index | ||
| ); | ||
| // Return no event, as the message has already been processed | ||
| Ok(None) | ||
| } | ||
| result => result, | ||
| } |
There was a problem hiding this comment.
Why don't we embed this logic inside of handle_l1_message?
crates/chain-orchestrator/src/lib.rs
Outdated
| match metered!(Task::BatchCommit, self, handle_batch_commit(batch.clone())) { | ||
| Err(ChainOrchestratorError::BatchCommitGap(batch_index)) => { | ||
| // Query database for the L1 block of the last known batch | ||
| let reset_block = | ||
| self.database.get_last_batch_commit_l1_block().await?.unwrap_or(0); | ||
| // TODO: handle None case (no batches in DB) | ||
|
|
||
| tracing::warn!( | ||
| target: "scroll::chain_orchestrator", | ||
| "Batch commit gap detected at index {}, last known batch at L1 block {}", | ||
| batch_index, | ||
| reset_block | ||
| ); | ||
|
|
||
| // Trigger gap recovery | ||
| self.trigger_gap_recovery(reset_block, "batch commit gap").await?; | ||
|
|
||
| // Return no event, recovery will re-process | ||
| Ok(None) | ||
| } | ||
| Err(ChainOrchestratorError::DuplicateBatchCommit(batch_info)) => { | ||
| tracing::info!( | ||
| target: "scroll::chain_orchestrator", | ||
| "Duplicate batch commit detected at {:?}, skipping", | ||
| batch_info | ||
| ); | ||
| // Return no event, as the batch has already been processed | ||
| Ok(None) | ||
| } | ||
| result => result, | ||
| } |
There was a problem hiding this comment.
Why don't we embedd this logic in handle_batch_commit?
crates/chain-orchestrator/src/lib.rs
Outdated
| /// # Arguments | ||
| /// * `reset_block` - The L1 block number to reset to (last known good state) | ||
| /// * `gap_type` - Description of the gap type for logging | ||
| async fn trigger_gap_recovery( |
There was a problem hiding this comment.
If we embed the L1Notification channel inside of the L1WatcherHandle then we can implement this logic on the L1WatcherHandle directly enabling better encapsulation.
…aling-l1-events Conflicts: crates/chain-orchestrator/src/lib.rs crates/node/src/args.rs crates/watcher/src/lib.rs crates/watcher/tests/indexing.rs crates/watcher/tests/logs.rs crates/watcher/tests/reorg.rs
…s with L1 message queue hash calculation
| } | ||
|
|
||
| // Check if batch already exists in DB. | ||
| for existing_batch in tx.get_batch_by_index(batch.index).await? { |
There was a problem hiding this comment.
@frisitano for maybe wrong? this maybe result in a loop lock
| // This means we have already processed this batch commit, we will skip | ||
| // it. | ||
| return Ok(Some(BatchCommitDuplicate(existing_batch.index))); | ||
| } else if existing_batch.reverted_block_number.is_none() { |
There was a problem hiding this comment.
| } else if existing_batch.reverted_block_number.is_none() { | |
| } else if existing_batch.reverted_block_number.is_some() { |
??
|
Closing as stale. |
This PR implements a self-healing gap recovery mechanism for L1 messages and batch events. The actual gap detection happens in the
ChainOrchestratorwhich subsequently notifies theL1Watcherthat it needs to reset.Specifically, the following changes are implemented:
Fixes: #328, #235